

package org.apache.hadoop.mapred;

import java.io.IOException;
import java.io.File;
import java.util.Arrays;

import org.apache.hadoop.fs.FileSystem;

import org.apache.hadoop.io.SequenceFile;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.conf.Configuration;

/** An {@link OutputFormat} that writes {@link SequenceFile}s. */
public class SequenceFileOutputFormat extends OutputFormatBase {

  public RecordWriter getRecordWriter(FileSystem fs, JobConf job,
                                      String name) throws IOException {

    File file = new File(job.getOutputDir(), name);

    final SequenceFile.Writer out =
      new SequenceFile.Writer(fs, file.toString(),
                              job.getOutputKeyClass(),
                              job.getOutputValueClass(),
                              job.getBoolean("mapred.output.compress", false));

    return new RecordWriter() {

        public void write(WritableComparable key, Writable value)
          throws IOException {

          out.append(key, value);
        }

        public void close(Reporter reporter) throws IOException { out.close();}
      };
  }

  /** Open the output generated by this format. */
  public static SequenceFile.Reader[] getReaders(Configuration conf, File dir)
    throws IOException {
    FileSystem fs = FileSystem.get(conf);
    File[] names = fs.listFiles(dir);
    
    // sort names, so that hash partitioning works
    Arrays.sort(names);
    
    SequenceFile.Reader[] parts = new SequenceFile.Reader[names.length];
    for (int i = 0; i < names.length; i++) {
      parts[i] = new SequenceFile.Reader(fs, names[i].toString(), conf);
    }
    return parts;
  }
}

